Flask不同于Django,Django在创建程序时自动得到必要的目录文件,而Flask则只有一个空文件夹,所以关于Flask项目的目录我们需要自行配置。
首先利用pycharm创建一个项目,在根目录下创建一个app文件夹(app文件)、一个utils文件夹(用于放数据库连接池)、一个manage.py文件和一个settings.py文件。这里我还创建了一个auth文件夹,写了一个验证登录的py文件
首先,我们要在app文件夹下依次创建一下内容:
-
- static文件夹:用于放静态文件
- templates文件夹:用于放置html模板文件
- views文件夹:用于放置视图函数
- __init__.py文件:注册app到启动文件,一会儿我们会在这里大做文章
- models.py文件:用于放置创建表的类
在__init__.py中,需要做的是引入Flask-SQLAlchemy、创建app、设置配置文件、注册蓝图和注册组件
#!/usr/bin/env python# -*- coding:utf-8 -*-from flask import Flaskfrom auth.auth import Authfrom flask_session import Session# 1. 引入Flask-SQLAlchemyfrom flask_sqlalchemy import SQLAlchemydb = SQLAlchemy()#实例化SQLAlchemyfrom .views.account import accountfrom .views.main import mainfrom .views.user import user# 3. 导入models中的表from .models import *#创建appdef create_app(): app = Flask(__name__) app.debug = True app.secret_key = 'sdiusdfsdf'#自定义的session秘钥 # 设置配置文件 app.config.from_object('settings.DevelopmentConfig') # 注册蓝图 app.register_blueprint(account) app.register_blueprint(user) app.register_blueprint(main) # 注册组件 # Session(app) Auth(app) # 2. 注册 Flask-SQLAlchemy # 这个对象在其他地方想要使用 # SQLAlchemy(app) db.init_app(app) return app#得到了一个app
然后,在启动文件manage.py中导入创建的app,创建自定义命令后写下run
#!/usr/bin/env python# -*- coding:utf-8 -*-"""被毙掉了 # 5. 创建和删除表 以后执行db.create_all() 以后执行db.drop_all()新第5步: 安装 pip3 install Flask-Migrate"""import osfrom flask_script import Manager, Server# 5.1 导入from flask_migrate import Migrate, MigrateCommandfrom app import create_app, dbapp = create_app()manager = Manager(app)# 5.2 创建migrate示例migrate = Migrate(app, db)@manager.commanddef custom(arg): """ 自定义命令 python manage.py custom 123 :param arg: :return: """ print(arg)@manager.option('-n', '--name', dest='name')@manager.option('-u', '--url', dest='url')def cmd(name, url): """ 自定义命令 执行: python manage.py cmd -n wupeiqi -u http://www.oldboyedu.com :param name: :param url: :return: """ print(name, url)@manager.commanddef import_news(path): """ 批量导入 :param name: :param url: :return: """ import xlrd from xlrd.book import Book from xlrd.sheet import Sheet from xlrd.sheet import Cell workbook = xlrd.open_workbook(path) sheet_names = workbook.sheet_names() # sheet = workbook.sheet_by_name('工作表1') sheet = workbook.sheet_by_index(0) # 循环Excel文件的所有行 for row in sheet.get_rows(): # print(row) # 循环一行的所有列 for col in row: # 获取一个单元格中的值 print(col.value, end=';') print('')"""# 数据库迁移命名 python manage.py db init python manage.py db migrate python manage.py db upgrade"""# 5.3 创建db命令,以后在数据库操作的时候都可使用dbmanager.add_command('db', MigrateCommand)"""# 自定义命令 python manage.py runserver """manager.add_command("runserver", Server())"""生成当前环境的所有依赖: requirements.txt pip3 freeze > requirements.txt 生成当前程序的所有依赖: requirements.txt pip3 install pipreqs pipreqs ./ """if __name__ == "__main__": manager.run()
接下来在settings.py文件中配置好相应的数据库信息,在utils文件夹下创建文件,写入数据库连接池的内容
#!/usr/bin/env python# -*- coding:utf-8 -*-import redisclass BaseConfig(object): SESSION_TYPE = 'redis' # session类型为redis SESSION_KEY_PREFIX = 'session:' # 保存到session中的值的前缀 SESSION_PERMANENT = False # 如果设置为True,则关闭浏览器session就失效。 SESSION_USE_SIGNER = False # 是否对发送到浏览器上 session:cookie值进行加密 SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:123@127.0.0.1:3306/flask_cata?charset=utf8" #数据库用户名:密码@host:port/数据库名?编码 SQLALCHEMY_POOL_SIZE = 2 SQLALCHEMY_POOL_TIMEOUT = 30 SQLALCHEMY_POOL_RECYCLE = -1 # 追踪对象的修改并且发送信号 SQLALCHEMY_TRACK_MODIFICATIONS = Falseclass ProductionConfig(BaseConfig): passclass DevelopmentConfig(BaseConfig): passclass TestingConfig(BaseConfig): pass
import timeimport pymysqlimport threadingfrom DBUtils.PooledDB import PooledDB, SharedDBConnectionPOOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always # host='47.93.4.198', host='127.0.0.1',#本机host port=3306,#数据库默认端口即3306 user='root',#数据库用户名 password='123',#数据库密码 database='flask_cata',#数据库名 charset='utf8')
"""# 方式一# helper = SQLHelper()# helper.open()# result = helper.fetchone('select * from users where name=%s and pwd = %s',[request.form.get('user'),request.form.get('pwd'),])# helper.close()# 方式二:# with SQLHelper() as helper:# result = helper.fetchone('select * from users where name=%s and pwd = %s',[request.form.get('user'),request.form.get('pwd'),])# if result:# current_app.auth_manager.login(result['name'])# return redirect('/index')"""from utils.pool import db_poolimport pymysqlclass SQLHelper(object): def __init__(self): self.conn = None self.cursor = None def open(self, cursor=pymysql.cursors.DictCursor): self.conn = db_pool.POOL.connection() self.cursor = self.conn.cursor(cursor=cursor) def close(self): self.cursor.close() self.conn.close() def fetchone(self, sql, params): cursor = self.cursor cursor.execute(sql, params) result = cursor.fetchone() return result def fetchall(self, sql, params): cursor = self.cursor cursor.execute(sql, params) result = cursor.fetchall() return result def __enter__(self): self.open() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() # with SQLHelper() as obj: # # print(obj) # print('正在执行')
到这里,程序的基本框架就设定好了。
接下来就是具体项目的内容,这里举一个简单的栗子
在models.py中创建表
"""# 方式一# helper = SQLHelper()# helper.open()# result = helper.fetchone('select * from users where name=%s and pwd = %s',[request.form.get('user'),request.form.get('pwd'),])# helper.close()# 方式二:# with SQLHelper() as helper:# result = helper.fetchone('select * from users where name=%s and pwd = %s',[request.form.get('user'),request.form.get('pwd'),])# if result:# current_app.auth_manager.login(result['name'])# return redirect('/index')"""from utils.pool import db_poolimport pymysqlclass SQLHelper(object): def __init__(self): self.conn = None self.cursor = None def open(self, cursor=pymysql.cursors.DictCursor): self.conn = db_pool.POOL.connection() self.cursor = self.conn.cursor(cursor=cursor) def close(self): self.cursor.close() self.conn.close() def fetchone(self, sql, params): cursor = self.cursor cursor.execute(sql, params) result = cursor.fetchone() return result def fetchall(self, sql, params): cursor = self.cursor cursor.execute(sql, params) result = cursor.fetchall() return result def __enter__(self): self.open() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() # with SQLHelper() as obj: # # print(obj) # print('正在执行')
通过python manage.py db init python manage.py db migrate python manage.py db upgrade三条命令进行数据库迁移创建表,得到migrations文件夹
在views文件夹下创建视图函数,并配置路由。在templates文件夹下创建模板html,在static下创建静态文件,基本项目就成型了
views:
#!/usr/bin/env python# -*- coding:utf-8 -*-from flask import Blueprint, request, render_template, redirect, session, current_appfrom app import modelsfrom app import dbfrom sqlalchemy import ormaccount = Blueprint('account', __name__)@account.route('/login', methods=['GET', 'POST'])def login(): if request.method == 'GET': """ 执行SQL 方式一: result = db.session.query(models.User.id,models.User.name).all() db.session.remove() 方式二: result = models.Users.query.all() """ """ 示例: # 通配符 ret = db.session.query(Users).filter(Users.name.like('e%')).all() ret = db.session.query(Users).filter(~Users.name.like('e%')).all() # 限制 ret = db.session.query(Users)[1:2] # 排序 ret = db.session.query(Users).order_by(Users.name.desc()).all() ret = db.session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() db.session.remove() """ return render_template('login.html') else: return render_template('login.html')
#!/usr/bin/env python# -*- coding:utf-8 -*-from flask import Blueprint, request, render_template, redirectmain = Blueprint('main', __name__)@main.route('/index')def index(): # with SQLHelper() as helper: # user_list = helper.fetchall('select * from users',[]) return render_template('index.html')
#!/usr/bin/env python# -*- coding:utf-8 -*-from flask import Blueprintuser = Blueprint('user', __name__)@user.route('/users')def users(): return 'users'
templates:
{% extends 'layout.html' %}{ % block title %}首页{% endblock %}{ % block body %}欢迎登录:{ { session.get('user') }}
- { % for row in user_list %}
- { { row }} { % endfor %}
{% block title %}{% endblock %} { % block css %}{% endblock %}{ % block body %}{% endblock %}{ % block js %}{% endblock %}
Title
附:
alembic==0.9.6blinker==1.4certifi==2017.11.5chardet==3.0.4click==6.7DBUtils==1.2docopt==0.6.2Flask==0.12.2Flask-Migrate==2.1.1Flask-Script==2.0.6Flask-Session==0.3.1Flask-SQLAlchemy==2.3.2idna==2.6itsdangerous==0.24Jinja2==2.10Mako==1.0.7MarkupSafe==1.0pipreqs==0.4.9PyMySQL==0.8.0python-dateutil==2.6.1python-editor==1.0.3PyTyrion==1.0.1requests==2.18.4six==1.11.0SQLAlchemy==1.2.0urllib3==1.22Werkzeug==0.14.1WTForms==2.1xlrd==1.1.0yarg==0.1.9
#!/usr/bin/env python# -*- coding:utf-8 -*-from flask import request, session,redirect# 登录成功时 _request_ctx_stack.top.user = userclass Auth(object): def __init__(self, app=None): self.app = app if app: self.init_app(app) def init_app(self, app): app.auth_manager = self app.before_request(self.check_login) app.context_processor(self.auth_context_processor) def auth_context_processor(self): name = session.get('user') return dict(current_user=name) def check_login(self): # print(request.url) # print(request.path) if request.path == '/login': return None if session.get('user'): return None return redirect('/login') def permission(self): pass def login(self,data): session['user'] = data